#!/usr/bin/env python

'''Show that a cluster of three servers auto-reconfigures as expected when one fails'''

import copy, os, pprint, sys, time

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir, 'common')))
import driver, rdb_unittest, utils

# ==

recordsToAdd = 10

# ==

class SingleServerFailure(rdb_unittest.RdbTestCase):
    
    servers = ['PrinceHamlet', 'KingHamlet', 'Gertrude', 'Polonius'] # note: self.conn will be to the first in this list
    
    def test_primary(self):
        
        # -- add a proxy server to the cluster
        
        proxyServer = driver.ProxyProcess(cluster=self.cluster, console_output=True)
        
        # -- data
        
        tables = [
            {   'name': 'no_effect',
                'primary': 'PrinceHamlet', 'primaryEffect': 'same',
                'replicas': ['PrinceHamlet', 'Gertrude', 'Polonius'], 'nonvoting_replicas':['Polonius'],
                'status': { 'all_replicas_ready': True,
                             'ready_for_outdated_reads': True,
                             'ready_for_reads': True,
                             'ready_for_writes': True},
                'issue': None},
            {   'name': 'fully_available',
                'primary': 'PrinceHamlet', 'primaryEffect': 'same',
                'replicas': ['PrinceHamlet', 'KingHamlet', 'Gertrude', 'Polonius'], 'nonvoting_replicas':['Polonius'],
                'status': { 'all_replicas_ready': False,
                             'ready_for_outdated_reads': True,
                             'ready_for_reads': True,
                             'ready_for_writes': True},
                'issue': 'Table `test.fully_available` is available for all operations, but some replicas are not ready. The following servers are not reachable: KingHamlet'},
            {   'name': 'change_primary',
                'primary': 'KingHamlet', 'primaryEffect': 'changed',
                'replicas': ['PrinceHamlet', 'KingHamlet', 'Gertrude', 'Polonius'], 'nonvoting_replicas':['Polonius'],
                'status': { 'all_replicas_ready': False,
                             'ready_for_outdated_reads': True,
                             'ready_for_reads': True,
                             'ready_for_writes': True},
                'issue': 'Table `test.change_primary` is available for all operations, but some replicas are not ready. The following servers are not reachable: KingHamlet'},
            {   'name': 'outdated_reads',
                'primary': 'KingHamlet', 'primaryEffect': 'none',
                'replicas': ['PrinceHamlet', 'KingHamlet', 'Polonius'], 'nonvoting_replicas':['Polonius'],
                'status': { 'all_replicas_ready': False,
                             'ready_for_outdated_reads': True,
                             'ready_for_reads': False,
                             'ready_for_writes': False},
                'issue': 'Table `test.outdated_reads` is available for outdated reads, but not up-to-date reads or writes. The following servers are not reachable: KingHamlet'},
            {   'name': 'nonvoter',
                'primary': 'KingHamlet', 'primaryEffect': 'none',
                'replicas': ['KingHamlet', 'Polonius'], 'nonvoting_replicas':['Polonius'],
                'status': { 'all_replicas_ready': False,
                             'ready_for_outdated_reads': True,
                             'ready_for_reads': False,
                             'ready_for_writes': False},
                'issue': 'Table `test.nonvoter` is available for outdated reads, but not up-to-date reads or writes. The following servers are not reachable: KingHamlet'},
            {   'name': 'unavailable',
                'primary': 'KingHamlet', 'primaryEffect': 'none',
                'replicas': ['KingHamlet'], 'nonvoting_replicas':[],
                'status': { 'all_replicas_ready': False,
                             'ready_for_outdated_reads': False,
                             'ready_for_reads': False,
                             'ready_for_writes': False},
                'issue': 'Table `test.unavailable` is not available for any operations. No servers are reachable for this table.'},
        ]
        
        # - make both write_acks versions
        for i in range(len(tables)):
            single = tables[i]
            majority = copy.deepcopy(tables[i])
            tables.append(majority)
            
            if single['issue']:
                single['issue'] = single['issue'].replace(single['name'], single['name'] + '_single')
            single['name'] += '_single'
            single['write_acks'] = 'single'
            
            if majority['issue']:
                majority['issue'] = majority['issue'].replace(majority['name'], majority['name'] + '_majority')
            majority['name'] += '_majority'
            majority['write_acks'] = 'majority'
        
        # -- create tables

        utils.print_with_time("Creating %d tables" % len(tables))
        newTableInfo = []
        for table in tables:
            thisTableInfo = {
                'db':self.dbName,
                'name':table['name'],
                'shards':[{
                    'primary_replica':table['primary'],
                    'replicas':table['replicas']
                }],
                'write_acks':table['write_acks']
            }
            if table['nonvoting_replicas']:
                thisTableInfo['shards'][0]['nonvoting_replicas'] = table['nonvoting_replicas']
            newTableInfo.append(thisTableInfo)
        res = self.r.db("rethinkdb").table("table_config").insert(newTableInfo).run(self.conn)
        self.assertEqual(res['inserted'], len(tables), 'Table creation was not sucessfull:\n%sFor config:\n%s' % (pprint.pformat(res), pprint.pformat(newTableInfo)))
        self.db.wait(wait_for="all_replicas_ready").run(self.conn)
        
        # - fill each with a small amount of data
        
        for table in tables:
            utils.populateTable(self.conn, self.db.table(table['name']), records=recordsToAdd)
        
        # -- kill the KingHamlet server
        
        utils.print_with_time("Killing KingHamlet")
        self.cluster['KingHamlet'].kill()
        
        # - check that we see all the issue (simultaneously)
        
        utils.print_with_time("Checking for expected issues")
        expectedIssues = dict((x['issue'], x) for x in tables if x['issue'])
        roundActualIssues = None
        roundExpectedIssues = None
        roundExtraIssues = None
        
        deadline = time.time() + 20
        while time.time() < deadline:
            roundActualIssues = list(self.r.db('rethinkdb').table('current_issues').filter(self.r.row["type"] != "memory_error").run(self.conn))
            roundExpectedIssues = copy.copy(expectedIssues)
            roundExtraIssues = [] # there are other transition messages, quick enough that we might not see them
            
            for actualIssue in roundActualIssues:
                if actualIssue['description'] not in roundExpectedIssues:
                    roundExtraIssues.append(actualIssue)
                else:
                    expectedIssue = roundExpectedIssues[actualIssue['description']]
                    self.assertEqual(actualIssue['type'], 'table_availability', pprint.pformat(actualIssue))
                    self.assertEqual(actualIssue['info']['db'], self.dbName, pprint.pformat(actualIssue))
                    self.assertEqual(actualIssue['info']['table'], expectedIssue['name'], pprint.pformat(actualIssue))
                    self.assertEqual(actualIssue['info']['status'], expectedIssue['status'], pprint.pformat(actualIssue))
                    del roundExpectedIssues[actualIssue['description']]
            if len(roundExpectedIssues) == 0:
                break
        if len(roundExpectedIssues) != 0:
            self.fail('Missing issues after killing KingHamlet:\n%s\nActual:\n%s' % (pprint.pformat(roundExpectedIssues.keys()), pprint.pformat(roundActualIssues)))
        if len(roundExtraIssues) != 0:
            self.fail('Got unexpected issues after killing KingHamlet:\n%s' % pprint.pformat(roundExtraIssues))
        
        # -- check that the tables respond as expected
        
        utils.print_with_time("Checking the status of the tables")
        for table in tables:
            # - normal read
            try:
                result = self.db.table(table['name']).limit(1).run(self.conn)
                self.assertTrue(table['status']['ready_for_reads'], 'Table %s should not have been avalible for normal reads but was: %r' % (table['name'], result))
            except self.r.RqlError as result:
                self.assertFalse(table['status']['ready_for_reads'], 'Table %s should have been avalible for normal reads but got error: %s' % (table['name'], str(result)))
            
            # - outdated read
            try:
                result = self.db.table(table['name']).limit(1).run(self.conn, read_mode='outdated')
                self.assertTrue(table['status']['ready_for_outdated_reads'], 'Table %s should not have been avalible for normal reads but was: %r' % (table['name'], result))
            except self.r.RqlError as result:
                self.assertFalse(table['status']['ready_for_outdated_reads'], 'Table %s should have been avalible for normal reads but got error: %s' % (table['name'], str(result)))
            
            # - write
            try:
                result = self.db.table(table['name']).insert({}).run(self.conn)
                self.assertTrue(table['status']['ready_for_writes'], 'Table %s should not have been avalible for normal reads but was: %r' % (table['name'], result))
            except self.r.RqlError as result:
                self.assertFalse(table['status']['ready_for_writes'], 'Table %s should have been avalible for normal reads but got error: %s' % (table['name'], str(result)))
        
        # - expected primary (or change)
        tableStatus = dict((x['name'], x) for x in self.r.db('rethinkdb').table('table_status').filter({'db':'test'}).run(self.conn))
        for table in tables:
            self.assertTrue(table['name'] in tableStatus, 'table %s was unexpectedly not in table_status: %r' % (table['name'], tableStatus))
            if table['primaryEffect'] == 'same':
                self.assertEqual(tableStatus[table['name']]['shards'][0]['primary_replicas'][0], table['primary'], 'table %s was not using the inital priamry: %r' % (table['name'], tableStatus[table['name']]))
            elif table['primaryEffect'] == 'changed':
                self.assertNotEqual(tableStatus[table['name']]['shards'][0]['primary_replicas'][0], table['primary'], 'table %s was unexpectedly still using the inital priamry: %r' % (table['name'], tableStatus[table['name']]))
            elif table['primaryEffect'] == 'none':
                if 'shards' in tableStatus[table['name']] and tableStatus[table['name']]['shards']: # unavailable_* does not show anything because we have no observers
                    self.assertEqual(tableStatus[table['name']]['shards'][0]['primary_replicas'], [])
        
        # -- restore KingHamlet
        
        utils.print_with_time("Re-animating KingHamlet")
        self.cluster['KingHamlet'].start()
        self.db.wait(wait_for="all_replicas_ready").run(self.conn)
        
        # -- check that all issues are resolved
        
        utils.print_with_time("Checking that the tables have returned normal")
        
        # - check that all issues are resolved
        
        issues = list(self.r.db('rethinkdb').table('current_issues').filter(self.r.row["type"] != "memory_error").run(self.conn))
        self.assertEqual(issues, [])
        
        # - check that all tables are fully available
        
        for table in tables:
            # - normal read
            self.db.table(table['name']).limit(1).run(self.conn)
            
            # - outdated read
            self.db.table(table['name']).limit(1).run(self.conn, read_mode='outdated')
            
            # - write
            self.db.table(table['name']).insert({}).run(self.conn)
        
        # - expected primary
        
        tableStatus = dict((x['name'], x) for x in self.r.db('rethinkdb').table('table_status').filter({'db':'test'}).run(self.conn))
        for table in tables:
            self.assertEqual(tableStatus[table['name']]['shards'][0]['primary_replicas'][0], table['primary'], 'table %s was not using the inital priamry: %r' % (table['name'], tableStatus[table['name']]))
        
        # - expected number of records
        
        for table in tables:
            records = self.db.table(table['name']).count().run(self.conn)
            additionalRecords = 2 if table['status']['ready_for_writes'] else 1  # we have inserted one or two additional records
            self.assertEqual(records, recordsToAdd + additionalRecords)

if __name__ == '__main__':
    rdb_unittest.main()
        
